Skip to content

[flink] optimize flink Data-Evolution-Merge-Into#7324

Merged
JingsongLi merged 3 commits intoapache:masterfrom
steFaiz:optim_flink_de_merge_into
Mar 3, 2026
Merged

[flink] optimize flink Data-Evolution-Merge-Into#7324
JingsongLi merged 3 commits intoapache:masterfrom
steFaiz:optim_flink_de_merge_into

Conversation

@steFaiz
Copy link
Contributor

@steFaiz steFaiz commented Mar 2, 2026

Purpose

This PR optimize Data-Evolution-Merge-Into in several aspects:

  1. Introduce a MergeIntoUpdateChecker to check if some global-indexed columns are updated (This is same as Spark's implementation)
  2. Use calcite to rename target table (current implementation is based on regex, which is very unstable)
  3. Use calcite to find _row_id field (if exists) in source table. We can eliminate join process.

Linked issue: None

Tests

Please see: org.apache.paimon.flink.action.DataEvolutionMergeIntoActionITCase.

API and Format

None

Documentation

None

Generative AI tooling

Full hand-writing.

@JingsongLi
Copy link
Contributor

Maybe you can just use CommitOperator with commit.strict-mode.last-safe-snapshot configured in DataEvolutionMergeIntoAction.

@steFaiz
Copy link
Contributor Author

steFaiz commented Mar 2, 2026

Maybe you can just use CommitOperator with commit.strict-mode.last-safe-snapshot configured in DataEvolutionMergeIntoAction.

@JingsongLi Thanks for your advise! I've modified my code.

@JingsongLi
Copy link
Contributor

I mean you can use configure scanned snapshot id to commit.strict-mode.last-safe-snapshot, this snapshot id should be used by MERGE INTO source.

And you don't need to introduce MergeIntoCommitterOperator, just use CommitterOperator, commit.strict-mode.last-safe-snapshot will solve your FilterAndCommit problem.

@steFaiz
Copy link
Contributor Author

steFaiz commented Mar 2, 2026

I mean you can use configure scanned snapshot id to commit.strict-mode.last-safe-snapshot, this snapshot id should be used by MERGE INTO source.

And you don't need to introduce MergeIntoCommitterOperator, just use CommitterOperator, commit.strict-mode.last-safe-snapshot will solve your FilterAndCommit problem.

@JingsongLi Thanks for your explaination! I've set the commit.strict-mode.last-safe-snapshot as the latestSnapshotId in the action's constructor.

I introduce the new committer operator wrapping a CommitterOperator to check the received committables, so that we could throw and error or drop partition index if merge into action modified some global-indexed columns.

An alternative approach is to introduce a new operator before committer e.g. UpdateCheckerOperator. Which one do you prefer, I'm willing to modify my code.

@JingsongLi
Copy link
Contributor

@steFaiz Sounds good to me.

@steFaiz
Copy link
Contributor Author

steFaiz commented Mar 3, 2026

@steFaiz Sounds good to me.

@JingsongLi Appreciate for it! Do you mean I should introduce a UpdateCheckerOperator or keep the new committer(just resolve the conflict)?

@JingsongLi
Copy link
Contributor

@steFaiz Sounds good to me.

@JingsongLi Appreciate for it! Do you mean I should introduce a UpdateCheckerOperator or keep the new committer(just resolve the conflict)?

introduce a UpdateCheckerOperator

@steFaiz steFaiz force-pushed the optim_flink_de_merge_into branch from aafd973 to d8cd94f Compare March 3, 2026 13:34
@JingsongLi
Copy link
Contributor

+1

@JingsongLi JingsongLi merged commit e102abd into apache:master Mar 3, 2026
10 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants